package org.codehaus.activemq.store.jdbc;

import java.sql.Connection;
import java.sql.SQLException;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.SubscriberEntry;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
	private static final Log log = LogFactory.getLog(JDBCTopicMessageStore.class);
	private MessageContainer container;

	public JDBCTopicMessageStore(JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
		super(adapter, wireFormat, destinationName);
	}

	public void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity)
			throws JMSException {
		long seq = ((Long) messageIdentity.getSequenceNumber()).longValue();
		try {
			Connection c = TransactionContext.getConnection();
			this.adapter.doSetLastAck(c, subscription.getPersistentKey(), this.destinationName, seq);
		} catch (SQLException e) {
			throw JMSExceptionHelper.newJMSException("Failed to store ack for: " + subscription + " on message "
					+ messageIdentity + " in container: " + e, e);
		}
	}

	public MessageIdentity getLastestMessageIdentity() throws JMSException {
		return new MessageIdentity(null, new Long(this.sequenceGenerator.getLastSequenceId()));
	}

	public void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage)
			throws JMSException {
		try {
			Connection c = TransactionContext.getConnection();
			this.adapter.doRecoverSubscription(c, subscription.getPersistentKey(), this.destinationName,
					new JDBCAdapter.MessageListResultHandler() {
						private final Subscription val$subscription=subscription;

						public void onMessage(long seq, String messageID) throws JMSException {
							MessageIdentity messageIdentity = new MessageIdentity(messageID, new Long(seq));
							ActiveMQMessage message = JDBCTopicMessageStore.this.getMessage(messageIdentity);
							this.val$subscription.addMessage(JDBCTopicMessageStore.this.container, message);
						}
					});
		} catch (SQLException e) {
			throw JMSExceptionHelper
					.newJMSException("Failed to recover subscription: " + subscription + ". Reason: " + e, e);
		}
	}

	public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
		String key = info.getConsumerKey();
		try {
			Connection c = TransactionContext.getConnection();
			this.adapter.doSetSubscriberEntry(c, this.destinationName, key, subscriberEntry);
		} catch (SQLException e) {
			throw JMSExceptionHelper
					.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
		}
	}

	public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
		String key = info.getConsumerKey();
		try {
			Connection c = TransactionContext.getConnection();
			return this.adapter.doGetSubscriberEntry(c, this.destinationName, key);
		} catch (SQLException e) {
			throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e,
					e);
		}
		
	}

	public void setMessageContainer(MessageContainer container) {
		this.container = container;
	}

	public void incrementMessageCount(MessageIdentity messageId) throws JMSException {
	}

	public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack)
			throws JMSException {
	}
}